package cn.aiyuan.listen;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

import java.util.Optional;

/**
 * @author wangyuan
 * @date 2021/10/15
 */
@Component
@Slf4j
public class TestListen {

    public static final String TOPIC_MSG = "aiyuan_flink_msg";


    @KafkaListener(topics = TOPIC_MSG)
    public void topic_test(String msg, Acknowledgment ack) {
        log.error(msg);
        ack.acknowledge();
    }

}
